package org.hong.monkey.broadcast

import java.util.concurrent.atomic.AtomicLong

import org.hong.monkey.util.Utils
import org.hong.monkey.{Logging, MonkeyConf}

import scala.reflect.ClassTag

private[monkey] class BroadcastManager(
    val isDriver: Boolean,
    conf: MonkeyConf)
  extends Logging {

  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

  initialize()

  // Called by MonkeyContext or Executor before using Broadcast
  private def initialize() {
    synchronized {
      if (!initialized) {
        val broadcastFactoryClass =
          conf.get("monkey.broadcast.factory", "org.hong.monkey.broadcast.TorrentBroadcastFactory")

        broadcastFactory =
          Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

        // Initialize appropriate BroadcastFactory and BroadcastObject
        broadcastFactory.initialize(isDriver, conf)

        initialized = true
      }
    }
  }

  def stop() {
    broadcastFactory.stop()
  }

  private val nextBroadcastId = new AtomicLong(0)

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}
